1 /*
2 * Copyright (C) 2009 The Guava Authors
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package com.google.common.util.concurrent;
18
19 import static com.google.common.base.Preconditions.checkNotNull;
20 import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
21
22 import com.google.common.annotations.Beta;
23
24 import java.util.concurrent.Executor;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.Future;
27 import java.util.concurrent.ThreadFactory;
28 import java.util.concurrent.atomic.AtomicBoolean;
29
30 /**
31 * Utilities necessary for working with libraries that supply plain {@link
32 * Future} instances. Note that, whenver possible, it is strongly preferred to
33 * modify those libraries to return {@code ListenableFuture} directly.
34 *
35 * @author Sven Mawson
36 * @since 10.0 (replacing {@code Futures.makeListenable}, which
37 * existed in 1.0)
38 */
39 @Beta
40 public final class JdkFutureAdapters {
41 /**
42 * Assigns a thread to the given {@link Future} to provide {@link
43 * ListenableFuture} functionality.
44 *
45 * <p><b>Warning:</b> If the input future does not already implement {@code
46 * ListenableFuture}, the returned future will emulate {@link
47 * ListenableFuture#addListener} by taking a thread from an internal,
48 * unbounded pool at the first call to {@code addListener} and holding it
49 * until the future is {@linkplain Future#isDone() done}.
50 *
51 * <p>Prefer to create {@code ListenableFuture} instances with {@link
52 * SettableFuture}, {@link MoreExecutors#listeningDecorator(
53 * java.util.concurrent.ExecutorService)}, {@link ListenableFutureTask},
54 * {@link AbstractFuture}, and other utilities over creating plain {@code
55 * Future} instances to be upgraded to {@code ListenableFuture} after the
56 * fact.
57 */
58 public static <V> ListenableFuture<V> listenInPoolThread(
59 Future<V> future) {
60 if (future instanceof ListenableFuture) {
61 return (ListenableFuture<V>) future;
62 }
63 return new ListenableFutureAdapter<V>(future);
64 }
65
66 /**
67 * Submits a blocking task for the given {@link Future} to provide {@link
68 * ListenableFuture} functionality.
69 *
70 * <p><b>Warning:</b> If the input future does not already implement {@code
71 * ListenableFuture}, the returned future will emulate {@link
72 * ListenableFuture#addListener} by submitting a task to the given executor at
73 * the first call to {@code addListener}. The task must be started by the
74 * executor promptly, or else the returned {@code ListenableFuture} may fail
75 * to work. The task's execution consists of blocking until the input future
76 * is {@linkplain Future#isDone() done}, so each call to this method may
77 * claim and hold a thread for an arbitrary length of time. Use of bounded
78 * executors or other executors that may fail to execute a task promptly may
79 * result in deadlocks.
80 *
81 * <p>Prefer to create {@code ListenableFuture} instances with {@link
82 * SettableFuture}, {@link MoreExecutors#listeningDecorator(
83 * java.util.concurrent.ExecutorService)}, {@link ListenableFutureTask},
84 * {@link AbstractFuture}, and other utilities over creating plain {@code
85 * Future} instances to be upgraded to {@code ListenableFuture} after the
86 * fact.
87 *
88 * @since 12.0
89 */
90 public static <V> ListenableFuture<V> listenInPoolThread(
91 Future<V> future, Executor executor) {
92 checkNotNull(executor);
93 if (future instanceof ListenableFuture) {
94 return (ListenableFuture<V>) future;
95 }
96 return new ListenableFutureAdapter<V>(future, executor);
97 }
98
99 /**
100 * An adapter to turn a {@link Future} into a {@link ListenableFuture}. This
101 * will wait on the future to finish, and when it completes, run the
102 * listeners. This implementation will wait on the source future
103 * indefinitely, so if the source future never completes, the adapter will
104 * never complete either.
105 *
106 * <p>If the delegate future is interrupted or throws an unexpected unchecked
107 * exception, the listeners will not be invoked.
108 */
109 private static class ListenableFutureAdapter<V> extends ForwardingFuture<V>
110 implements ListenableFuture<V> {
111
112 private static final ThreadFactory threadFactory =
113 new ThreadFactoryBuilder()
114 .setDaemon(true)
115 .setNameFormat("ListenableFutureAdapter-thread-%d")
116 .build();
117 private static final Executor defaultAdapterExecutor =
118 Executors.newCachedThreadPool(threadFactory);
119
120 private final Executor adapterExecutor;
121
122 // The execution list to hold our listeners.
123 private final ExecutionList executionList = new ExecutionList();
124
125 // This allows us to only start up a thread waiting on the delegate future
126 // when the first listener is added.
127 private final AtomicBoolean hasListeners = new AtomicBoolean(false);
128
129 // The delegate future.
130 private final Future<V> delegate;
131
132 ListenableFutureAdapter(Future<V> delegate) {
133 this(delegate, defaultAdapterExecutor);
134 }
135
136 ListenableFutureAdapter(Future<V> delegate, Executor adapterExecutor) {
137 this.delegate = checkNotNull(delegate);
138 this.adapterExecutor = checkNotNull(adapterExecutor);
139 }
140
141 @Override
142 protected Future<V> delegate() {
143 return delegate;
144 }
145
146 @Override
147 public void addListener(Runnable listener, Executor exec) {
148 executionList.add(listener, exec);
149
150 // When a listener is first added, we run a task that will wait for
151 // the delegate to finish, and when it is done will run the listeners.
152 if (hasListeners.compareAndSet(false, true)) {
153 if (delegate.isDone()) {
154 // If the delegate is already done, run the execution list
155 // immediately on the current thread.
156 executionList.execute();
157 return;
158 }
159
160 adapterExecutor.execute(new Runnable() {
161 @Override
162 public void run() {
163 try {
164 /*
165 * Threads from our private pool are never interrupted. Threads
166 * from a user-supplied executor might be, but... what can we do?
167 * This is another reason to return a proper ListenableFuture
168 * instead of using listenInPoolThread.
169 */
170 getUninterruptibly(delegate);
171 } catch (Error e) {
172 throw e;
173 } catch (Throwable e) {
174 // ExecutionException / CancellationException / RuntimeException
175 // The task is done, run the listeners.
176 }
177 executionList.execute();
178 }
179 });
180 }
181 }
182 }
183
184 private JdkFutureAdapters() {}
185 }